We've been working on some fun stuff lately, namely dask and anaconda cluster. So we have been experimenting with analyzing all the github data for 2015 on an EC2 cluster (a distributed memory environment). We use anaconda cluster to set up a 50 node cluster on EC2, then use dask for analysis.

Dask is a tool for out-of-core, parallel data analysis. Recently we added a distributed memory scheduler for running dask on clusters. We will also be using dask.bag, which provides an api for operations on unordered lists (like sets but with duplicates). It is great for dealing with semi-structured data like JSON blobs or log files. More blogposts about dask can be found here or here.

Anaconda Cluster lets us easily setup clusters and manage the packages in them with conda. Running the cluster for this demo is just a few lines.

acluster create my-cluster -p aws_profile  # create the cluster
acluster install notebook dask-cluster  # install plugins that setup an ipython notebook and dask cluster
acluster conda install boto ujson # install conda packages we need for the demo
acluster open notebook  # open ipython-notebook in the browser to interact with the cluster

While dask.distributed is well integrated with Anaconda cluster it isn't restricted to it. This blogpost shows how to set up a dask.distributed network manually and these docs show how to set up dask.distributed from any IPyParallel cluster.

Projects for python analytics in a distributed memory environment

Github Archive data on S3

We took data from githubarchive.com, from January 2015 to May 2015, and put this on S3. We choose S3 because there are nice python libraries for interacting with it, and we can get awesome bandwidth from EC2 to S3. (The script for gathering this data is here).

Lets inspect the data first so we can find something to analyze and learn the data schema. You can inspect the data yourself in the githubarchive-data S3 bucket.


In [1]:
### cut me out
import boto
bucket = boto.connect_s3().get_bucket('githubarchive-data')  # this is a public bucket

from fnmatch import fnmatchcase
keys = bucket.list()
def data_size(key_str):
    """Check the size of all the keys in our bucket that match pattern
    that can contain ? and * for matching.
    """
    matches = [k for k in keys if fnmatchcase(k.name, key_str)]
    return sum([k.size for k in matches]) * 1e-9  # in GB

data_size('2015-*')


Out[1]:
28.289080878

Inspect S3 data with dask.bag

We have approximately 28 GB of data. One file per hour, averaging around 7.8 MB each (compressed). So what is the schema and how can we inspect it? We take one file and turn it into a dask.Bag for analysis on our local machine.


In [2]:
import dask.bag as db
import ujson as json

# take one file from the bucket load it as a json object, not gz decompression
# happens automatically at compute time.
b = db.from_s3('githubarchive-data', '2015-01-01-0.json.gz').map(json.loads)

In [3]:
first = b.take(1)[0]  # take the first json object from the file
first


Out[3]:
{u'actor': {u'avatar_url': u'https://avatars.githubusercontent.com/u/9152315?',
  u'gravatar_id': u'',
  u'id': 9152315,
  u'login': u'davidjhulse',
  u'url': u'https://api.github.com/users/davidjhulse'},
 u'created_at': u'2015-01-01T00:00:00Z',
 u'id': u'2489368070',
 u'payload': {u'before': u'86ffa724b4d70fce46e760f8cc080f5ec3d7d85f',
  u'commits': [{u'author': {u'email': u'david.hulse@live.com',
     u'name': u'davidjhulse'},
    u'distinct': True,
    u'message': u'Altered BingBot.jar\n\nFixed issue with multiple account support',
    u'sha': u'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
    u'url': u'https://api.github.com/repos/davidjhulse/davesbingrewardsbot/commits/a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81'}],
  u'distinct_size': 1,
  u'head': u'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
  u'push_id': 536740396,
  u'ref': u'refs/heads/master',
  u'size': 1},
 u'public': True,
 u'repo': {u'id': 28635890,
  u'name': u'davidjhulse/davesbingrewardsbot',
  u'url': u'https://api.github.com/repos/davidjhulse/davesbingrewardsbot'},
 u'type': u'PushEvent'}

In [4]:
first.keys()  # top level keys in this json object


Out[4]:
[u'payload', u'created_at', u'actor', u'public', u'repo', u'type', u'id']

Type looks interesting. What are possible types and how often does each occur? We can inspect this with dask.bag.frequencies.


In [5]:
%time b.pluck('type').frequencies().compute()


CPU times: user 8 ms, sys: 76 ms, total: 84 ms
Wall time: 1.28 s
Out[5]:
[(u'ReleaseEvent', 24),
 (u'PublicEvent', 2),
 (u'PullRequestReviewCommentEvent', 85),
 (u'ForkEvent', 213),
 (u'MemberEvent', 16),
 (u'PullRequestEvent', 315),
 (u'IssueCommentEvent', 650),
 (u'PushEvent', 4280),
 (u'DeleteEvent', 141),
 (u'CommitCommentEvent', 56),
 (u'WatchEvent', 642),
 (u'IssuesEvent', 373),
 (u'CreateEvent', 815),
 (u'GollumEvent', 90)]

Top Committers

So most events are pushes, that is not surprising. Lets ask "Who pushes the most?".

We do this by filtering out PushEvents. Then we count the frequencies of usernames for the pushes. Then take the top 5.


In [6]:
pushes = b.filter(lambda x: x['type'] == 'PushEvent')  # filter out the push events
names = pushes.pluck('actor').pluck('login') # get the login names
top_5 = names.frequencies().topk(5, key=lambda (name, count): count)  # List top 5 pushers
%time top_5.compute()  # run the above computations


CPU times: user 12 ms, sys: 64 ms, total: 76 ms
Wall time: 1.26 s
Out[6]:
[(u'KenanSulayman', 79),
 (u'mirror-updates', 42),
 (u'cm-gerrit', 35),
 (u'qdm', 29),
 (u'greatfire', 24)]

These users pushed the most, but push can have multiple commits. So we can ask "who pushed the most commits?".

We can figure this out by grouping by username, then summing the number of commits from every push, for each user. More technically speaking, we want to GroupBy on usernames, so for each username we get a list their of PushEvents. Then reduce each PushEvent by taking a count of their commits. Then reducing these counts by suming them for each user. So we are grouping then reducing.

However there are algorithms for grouping and reducing simultaneously which avoid expensive shuffle operations and are much faster. In dask bag we have foldby. Analogous methods: toolz.reduceby, and in pyspark RDD.combineByKey.


In [7]:
def get_logins(x):
    """The key for foldby, like a groupby key. Get the username from a PushEvent"""
    return x['actor']['login']

def binop(total, x):
    """Count the number of commits in a PushEvent"""
    return total + len(x['payload']['commits'])

def combine(total1, total2):
    """This combines commit counts from PushEvents"""
    return total1 + total2

commits = pushes.foldby(get_logins, binop, initial=0, combine=combine)
top_commits = commits.topk(5, key=lambda (name, count): count)
%time top_commits.compute()


CPU times: user 12 ms, sys: 64 ms, total: 76 ms
Wall time: 1.28 s
Out[7]:
[(u'mirror-updates', 413),
 (u'jrmarino', 88),
 (u'javra', 80),
 (u'KenanSulayman', 79),
 (u'chcholman', 51)]

Recall this dask.Bag had one file. Now that we know how to get the top committers, we'll gradually load more data, and benchmark the dask.distributed scheduler against the default dask.multiprocessing scheduler.

Benchmarking dask.distributed

First we setup the distributed scheduler. Then write a benchmarking script, the benchmarking script is omitted but those interested can find both_benchmark here. Basically it does time the analysis and prints the results nicely.


In [8]:
# dask.distributed setup
import dask
from dask.distributed import Client

dc = Client('tcp://localhost:9000') # client connected to 50 nodes, 2 workers per node.
# pass dc.get to compute functions to use the distributed scheduler.

## cut out the rest of this cell
# make a top5 committers function
import time
from pprint import pprint
def both_benchmark(data_pattern):
    bag = db.from_s3('githubarchive-data', data_pattern).map(json.loads)
    pushes = bag.filter(lambda x: x['type'] == 'PushEvent')
    commits =  pushes.foldby(get_logins, binop, initial=0, combine=combine)
    top5 = commits.topk(5, lambda x: x[1])
    
    # time the default comptue and the distributed compute
    default_start = time.time()
    default_result = top5.compute()
    default_time = time.time() - default_start
    
    dist_start = time.time()
    dist_result = top5.compute(get=dc.get)
    dist_time = time.time() - dist_start
    
    # assert we have the same result
    assert default_result == dist_result
    
    # size of the computed data
    size = data_size(data_pattern)
    
    # general details
    print("To compute {0:.4f} GB of data the default scheduler took {1:.2f} seconds, the distributed scheduler took {2:.2f} seconds".format(size, default_time, dist_time))
    print("")
    # speedup default_time / dist_time
    print("Distributed scheduler is \t\t\t\t\t{:.2f} times faster.".format(default_time / dist_time))
    
    # single node bandwidth = size / default_time
    print("Default scheduler compute bandwidth: \t\t\t\t{:.2f} MB/s".format(1e3 * size / default_time))
    # dist bandwidth = size / dist_time
    print("Distributed scheduler compute bandwidth: \t\t\t{:.2f} MB/s".format(1e3 * size / dist_time))
    
    # dist node bandwidth per node = size / (time * node)
    print("Compute bandwidth per node with distributed scheduler: \t\t{:.3f} MB/(s node)".format(1e3 * size / (dist_time * 50)))
    print('')
    print("Analysis results:")
    pprint(dist_result)

Lets benchmark a single file first.


In [9]:
both_benchmark('2015-01-01-0.json.gz')


To compute 0.0026 GB of data the default scheduler took 1.25 seconds, the distributed scheduler took 1.09 seconds

Distributed scheduler is 					1.15 times faster.
Default scheduler compute bandwidth: 				2.10 MB/s
Distributed scheduler compute bandwidth: 			2.41 MB/s
Compute bandwidth per node with distributed scheduler: 		0.048 MB/(s node)

Analysis results:
[(u'mirror-updates', 413),
 (u'jrmarino', 88),
 (u'javra', 80),
 (u'KenanSulayman', 79),
 (u'chcholman', 51)]

Dask distributed is comparable with the default scheduler, that is not suprising for this small amount of data.

1 day of data


In [10]:
both_benchmark('2015-01-15-*.json.gz')


To compute 0.1952 GB of data the default scheduler took 36.58 seconds, the distributed scheduler took 4.29 seconds

Distributed scheduler is 					8.52 times faster.
Default scheduler compute bandwidth: 				5.34 MB/s
Distributed scheduler compute bandwidth: 			45.49 MB/s
Compute bandwidth per node with distributed scheduler: 		0.910 MB/(s node)

Analysis results:
[(u'mirror-updates', 9912),
 (u'KenanSulayman', 1848),
 (u'peff', 1140),
 (u'bors', 972),
 (u'dougclarknc', 887)]

Already a good speedup.

10 days of data


In [11]:
both_benchmark('2015-01-1?-*.json.gz')


To compute 1.5871 GB of data the default scheduler took 258.54 seconds, the distributed scheduler took 14.82 seconds

Distributed scheduler is 					17.45 times faster.
Default scheduler compute bandwidth: 				6.14 MB/s
Distributed scheduler compute bandwidth: 			107.11 MB/s
Compute bandwidth per node with distributed scheduler: 		2.142 MB/(s node)

Analysis results:
[(u'mirror-updates', 98297),
 (u'KenanSulayman', 18556),
 (u'qdm', 7012),
 (u'mAAdhaTTah', 6893),
 (u'greatfire', 5563)]

Computing this on one node is possible, but it is annoying to wait so long. So we continue with just the distributed scheduler. distributed_benchmark can be found here.


In [12]:
### cut me out
def distributed_benchmark(data_pattern):
    bag = db.from_s3('githubarchive-data', data_pattern).map(json.loads)
    pushes = bag.filter(lambda x: x['type'] == 'PushEvent')
    commits =  pushes.foldby(get_logins, binop, initial=0, combine=combine)
    top5 = commits.topk(5, lambda x: x[1])

    dist_start = time.time()
    dist_result = top5.compute(get=dc.get)
    dist_time = time.time() - dist_start

    # size of the computed data
    size = data_size(data_pattern)
    
    # general details
    print("To compute {0:.4f} GB of data the distributed scheduler took {1:.2f} seconds".format(size, dist_time))
    print('')
   
    # dist bandwidth = size / dist_time
    print("Distributed scheduler compute bandwidth: \t\t\t{:.2f} MB/s".format(1e3 * size / dist_time))
    
    # dist node bandwidth per node = size / (time * node)
    print("Compute bandwidth per node with distributed scheduler: \t\t{:.3f} MB/(s node)".format(1e3 * size / (dist_time * 50)))
    print('')
    
    print("Analysis results:")
    pprint(dist_result)

January 2015


In [13]:
distributed_benchmark('2015-01-*.json.gz')


To compute 5.0187 GB of data the distributed scheduler took 43.14 seconds

Distributed scheduler compute bandwidth: 			116.35 MB/s
Compute bandwidth per node with distributed scheduler: 		2.327 MB/(s node)

Analysis results:
[(u'mirror-updates', 302755),
 (u'greatfire', 57019),
 (u'KenanSulayman', 56557),
 (u'qdm', 20964),
 (u'greatfire-martin', 19594)]

January - May 2015


In [14]:
distributed_benchmark('2015-*.json.gz')


To compute 28.2891 GB of data the distributed scheduler took 246.21 seconds

Distributed scheduler compute bandwidth: 			114.90 MB/s
Compute bandwidth per node with distributed scheduler: 		2.298 MB/(s node)

Analysis results:
[(u'mirror-updates', 1463019),
 (u'KenanSulayman', 235300),
 (u'greatfirebot', 167558),
 (u'rydnr', 133323),
 (u'markkcc', 127625)]

Final Thoughts

This is experimental work. We had the following problems when doing this experiment:

  • Hard to deploy - solved by making a "dask-cluster" plugin for aconda-cluster
  • Investigating the state of distributed network is hard - partiall solved by providing clients views to workers
  • Profiling distributed computation is hard - in the future we'll try applying new dask profiling methods to the distributed scheduler.

We also have some lingering issues regarding performance:

  • Why does the distributed cluster perform worse than the single-node scheduler per node? This computation should be embarrassingly parallel.
  • 6MB/s of compressed data throughput on a single node is nice but we can probably do better. As always we should think first about single-core performance before we "go big" with a cluster.